- Published on
3.1.Flask 接口认证
- Authors

- Name
- xiaobai
1.目标与选型
- 目标:保护受控资源,仅允许已登录用户访问;实现注册、登录颁发令牌(access/refresh)、令牌校验与刷新、白名单与统一错误返回。
- 方案对比:
- PyJWT 手动集成:灵活可控,适合自定义需求;
- Flask-JWT-Extended:封装完善,支持装饰器、刷新令牌与附加 Claims。
依赖安装:
pip install flask PyJWT passlib[bcrypt]
# 或(使用扩展)
pip install flask Flask-JWT-Extended
2.通用配置与密码哈希
# config/auth.py
import os
JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'dev-secret')
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
JWT_ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRE_MINUTES', '30'))
JWT_REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv('JWT_REFRESH_TOKEN_EXPIRE_DAYS', '7'))
JWT_GLOBAL_ENABLE = os.getenv('JWT_GLOBAL_ENABLE', 'false').lower() == 'true'
JWT_WHITE_LIST = set(eval(os.getenv('JWT_WHITE_LIST', "['/login','/registry','/verify']")))
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
3.方案一:PyJWT 手动实现(access/refresh)
3.1.令牌工具
# utils/jwt_utils.py
import jwt
from datetime import datetime, timezone, timedelta
from config.auth import (
JWT_SECRET_KEY, JWT_ALGORITHM,
JWT_ACCESS_TOKEN_EXPIRE_MINUTES, JWT_REFRESH_TOKEN_EXPIRE_DAYS,
)
def create_access_token(username: str, minutes: int | None = None) -> str:
exp = datetime.now(timezone.utc) + timedelta(minutes=minutes or JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt.encode({'sub': username, 'type': 'access', 'exp': exp}, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
def create_refresh_token(username: str, days: int | None = None) -> str:
exp = datetime.now(timezone.utc) + timedelta(days=days or JWT_REFRESH_TOKEN_EXPIRE_DAYS)
return jwt.encode({'sub': username, 'type': 'refresh', 'exp': exp}, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
def decode_token(token: str) -> dict | None:
try:
return jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
except Exception:
return None
3.2.用户模型与仓库(示例)
# models/user_repo.py(演示用,生产替换为数据库)
USERS = {}
def add_user(username: str, hashed_password: str, email: str):
USERS[username] = {'username': username, 'hashed_password': hashed_password, 'email': email, 'valid': True}
def get_user(username: str):
return USERS.get(username)
3.3.Blueprint 与路由
# routes/auth.py
from flask import Blueprint, request
from functools import wraps
from config.auth import JWT_GLOBAL_ENABLE, JWT_WHITE_LIST
from utils.jwt_utils import create_access_token, create_refresh_token, decode_token
from config.auth import hash_password, verify_password
from models.user_repo import add_user, get_user
auth = Blueprint('auth', __name__)
def ok(data=None, status=200):
return {'code': 0, 'message': 'ok', 'data': data}, status
def err(message, status=400, code=1):
return {'code': code, 'message': message, 'data': None}, status
@auth.post('/registry')
def registry():
body = request.get_json(silent=True) or {}
username, password, email = body.get('username'), body.get('password'), body.get('email')
if not username or not password or not email:
return err('username/password/email required', 422)
if get_user(username):
return err('user exists', 409)
add_user(username, hash_password(password), email)
return ok({'username': username}, 201)
@auth.post('/login')
def login():
body = request.get_json(silent=True) or {}
username, password = body.get('username'), body.get('password')
u = get_user(username)
if not u or not verify_password(password, u['hashed_password']):
return err('invalid credentials', 401)
return ok({
'access_token': create_access_token(username),
'refresh_token': create_refresh_token(username),
'token_type': 'bearer'
})
def require_auth(func):
@wraps(func)
def wrapper(*args, **kwargs):
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return err('missing bearer', 401)
payload = decode_token(auth_header.split(' ', 1)[1])
if not payload or payload.get('type') != 'access':
return err('invalid or expired token', 401)
request.current_user = payload.get('sub')
return func(*args, **kwargs)
return wrapper
@auth.post('/token/refresh')
def refresh():
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return err('missing bearer', 401)
payload = decode_token(auth_header.split(' ', 1)[1])
if not payload or payload.get('type') != 'refresh':
return err('invalid or expired refresh token', 401)
username = payload.get('sub')
return ok({'access_token': create_access_token(username), 'token_type': 'bearer'})
@auth.get('/me')
@require_auth
def me():
return ok({'user': request.current_user})
3.4.全局认证(白名单)
# 在应用初始化处注册 before_request
from flask import Flask, request
from routes.auth import auth, err
from config.auth import JWT_GLOBAL_ENABLE, JWT_WHITE_LIST
from utils.jwt_utils import decode_token
app = Flask(__name__)
app.register_blueprint(auth)
@app.before_request
def global_auth():
if not JWT_GLOBAL_ENABLE:
return None
path = request.path
if path in JWT_WHITE_LIST:
return None
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return err('missing bearer', 401)
payload = decode_token(auth_header.split(' ', 1)[1])
if not payload or payload.get('type') != 'access':
return err('invalid or expired token', 401)
request.current_user = payload.get('sub')
return None
4.方案二:Flask-JWT-Extended(快速接入)
from flask import Flask, request
from flask_jwt_extended import (
JWTManager, create_access_token, create_refresh_token,
jwt_required, get_jwt_identity
)
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'dev-secret'
jwt = JWTManager(app)
@app.post('/login')
def login():
# 验证用户名密码后
username = (request.get_json() or {}).get('username')
access = create_access_token(identity=username)
refresh = create_refresh_token(identity=username)
return {'access_token': access, 'refresh_token': refresh}
@app.get('/me')
@jwt_required()
def me():
return {'user': get_jwt_identity()}
对比:扩展提供装饰器与刷新令牌等能力,减少样板代码;若需更强自定义(如白名单、复合校验),PyJWT 方案更灵活。
5.错误返回与安全建议
- 统一返回结构:
{'code','message','data'},便于前端消费与观测。 - 将令牌放在
Authorization: Bearer,避免置于 URL;如用 Cookie,配合 CSRF 防护。 - 刷新令牌建议做轮换与黑名单(服务端维护失效列表)。
- 明文密码永不入库,使用安全哈希(bcrypt);强制复杂度与限流防暴力破解。
6.curl 调试示例
# 注册
curl -X POST http://127.0.0.1:5000/registry \
-H 'Content-Type: application/json' \
-d '{"username":"alice","password":"123456","email":"alice@test.com"}'
# 登录
curl -X POST http://127.0.0.1:5000/login \
-H 'Content-Type: application/json' \
-d '{"username":"alice","password":"123456"}'
# 访问受控资源
curl -H 'Authorization: Bearer <access>' http://127.0.0.1:5000/me
# 刷新令牌
curl -X POST -H 'Authorization: Bearer <refresh>' http://127.0.0.1:5000/token/refresh
7.总结与扩展
- 两种认证方案各有适用场景;建议结合白名单、全局钩子与统一错误格式使用。
- 进一步可接入角色权限(RBAC)、审计日志、会话失效与设备绑定等能力。
